package com.my.service.task.map;
import com.alibaba.fastjson.JSONObject;
import com.my.service.task.entity.UserTypeInfo;
import com.my.service.task.kafka.KafkaEvent;
import com.my.service.task.log.ScanProductLog;
import com.my.service.task.util.HbaseUtils;
import com.my.service.task.utils.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class UserTypeMap implements FlatMapFunction<KafkaEvent, UserTypeInfo> {
    @Override
    public void flatMap(KafkaEvent kafkaEvent, Collector<UserTypeInfo> collector) throws Exception {
        // data 是停留在某个商品上发来的日志内容 ScanProductLog
        String data = kafkaEvent.getWord();
        // 将停留在某个商品上发来的日志信息解析为一个object
        ScanProductLog scanProductLog = JSONObject.parseObject(data, ScanProductLog.class);
        int userId = scanProductLog.getUserId();
        int userType = scanProductLog.getUserType(); //0 pc 1 移动端 2 小程序端
        String userTypeName = userType == 0 ? "pc端" : userType == 1 ? "移动端" : "小程序端";
        String tablename = "userflaginfo";
        String rowkey = userId + "";
        String familyname = "userbehavior";
        String column = "usertypelist";
        // mapData是从Hbase中查询到的用户终端偏好, 记录着用户每个终端的浏览次数
        String mapData = HbaseUtils.getdata(tablename, rowkey, familyname, column);
        Map<String,Long> map = new HashMap<>();
        // 如果Hbase中存在该用户的终端偏好，那么先调出来
        if(StringUtils.isNotBlank(mapData)){
            map = JSONObject.parseObject(mapData, Map.class);
        }
        String preMostLikeUserType = MapUtils.getKeyOfMaxValue(map);
        long preUserTypeTime = map.get(userTypeName) == null ? 0 : map.get(userTypeName);
        map.put(userTypeName, preUserTypeTime + 1);
        String res = JSONObject.toJSONString(map);
        HbaseUtils.putdata(tablename,rowkey,familyname,column,res);
        String nowMostLikeUserType = MapUtils.getKeyOfMaxValue(map);
        if(StringUtils.isNotBlank(preMostLikeUserType) && !preMostLikeUserType.equals(nowMostLikeUserType)){
            UserTypeInfo usertypeinfo = new UserTypeInfo();
            usertypeinfo.setUserType(preMostLikeUserType);
            usertypeinfo.setCount(-1L);
            usertypeinfo.setGroupbyfield("==usetypeinfo==" + preMostLikeUserType);
            collector.collect(usertypeinfo);
        }
        UserTypeInfo usertypeinfo = new UserTypeInfo();
        usertypeinfo.setUserType(nowMostLikeUserType);
        usertypeinfo.setCount(1L);
        usertypeinfo.setGroupbyfield("==usetypeinfo==" + nowMostLikeUserType);
        column = "usertypelike";
        assert nowMostLikeUserType != null;
        HbaseUtils.putdata(tablename, rowkey, familyname, column, nowMostLikeUserType);
    }
}
